HEX
Server: LiteSpeed
System: Linux eticaretsrv4.isimtescil.net 3.10.0-962.3.2.lve1.5.26.7.el7.x86_64 #1 SMP Wed Oct 2 07:53:12 EDT 2019 x86_64
User: sioberen (1086)
PHP: 7.3.33
Disabled: NONE
Upload Files
File: //opt/alt/python37/lib/python3.7/site-packages/cldiaglib.py
# coding=utf-8
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

from __future__ import print_function
from __future__ import absolute_import
import grp
import pwd
import sys

from future.moves import configparser as ConfigParser
import json
import os
import subprocess
from collections import namedtuple, OrderedDict

from future.utils import iteritems

import cldetectlib as detect
from cllimits_validator import LimitsValidator
from clcommon.utils import ExternalProgramFailed

# Possible result types (ChkStatus?)

OK = 'OK'   # 'PASSED' is better?
FAILED = 'FAILED'
SKIPPED = 'SKIPPED'
INTERNAL_TEST_ERROR = 'INTERNAL_TEST_ERROR'
cldiag_doc_link = 'https://docs.cloudlinux.com/cldiag.html'

ChkResult = namedtuple('ChkResult', [
    'res',          # One of predefined checker result types
    'msg',          # Resulting msg from this checker
])

SUEXEC_PATH = {
    'cPanel':       '/usr/local/apache/bin/suexec',
    'cPanel_ea4':   '/usr/sbin/suexec',
    'DirectAdmin':  '/usr/sbin/suexec',
    'Plesk':        '/usr/sbin/suexec',
    'ISPManager':   '/usr/sbin/suexec',
    'InterWorx':    '/usr/sbin/suexec',
    'H-Sphere':     '/usr/sbin/suexec',
    'HostingNG':    '/usr/sbin/suexec',
    'Unknown':      '/usr/sbin/suexec',
}

SUPHP_PATH = {
    'cPanel': '/opt/suphp/sbin/suphp',
    'cPanel_ea4': '/usr/sbin/suphp',
    'DirectAdmin':  '/usr/local/suphp/sbin/suphp',
    'Plesk':        '/usr/sbin/suphp',
    'ISPManager':   '/usr/sbin/suphp',
    'InterWorx':    '/usr/sbin/suphp',
    'H-Sphere':     '/usr/sbin/suphp',
    'HostingNG':    '/usr/sbin/suphp',
    'Unknown':      '/usr/sbin/suphp',
}

BINARY_CHECK_PARAMETERS = dict()
BINARY_CHECK_PARAMETERS['suphp'] = {
    'name': 'SuPHP',
    'status_function': 'detect.get_suPHP_status()',
    'location': SUPHP_PATH,
}
BINARY_CHECK_PARAMETERS['suexec'] = {
    'name': 'SuEXEC',
    'status_function': 'detect.get_suEXEC_status()',
    'location': SUEXEC_PATH,
}


def pretty_name(name_of_checker):
    def decorator(func):
        func.pretty_name = name_of_checker
        return func
    return decorator


def formatter(data, error_count, to_json=False):
    if to_json:
        res = {k: v._asdict() for k, v in iteritems(data)}
        res['total_errors'] = error_count
        return json.dumps(res)
    res = []
    for checker, chk_result in iteritems(data):
        res.append("{}:\n    {}: {}".format(checker, chk_result.res,
                                            chk_result.msg))
    res = '\n\n'.join(res + ['There are {} errors found.'.format(error_count)])
    return res


def runner(checkers, to_json=False, do_exit=True):
    if callable(checkers):  # allow single checker as input too
        checkers = [checkers]

    results_dict = OrderedDict()
    errors = 0
    for f in checkers:
        try:
            chk_result = f()
        except Exception as e:
            chk_result = ChkResult(INTERNAL_TEST_ERROR, repr(e))
        if chk_result.res in (FAILED, INTERNAL_TEST_ERROR,):
            errors += 1
        results_dict[f.pretty_name] = chk_result

    res = formatter(results_dict, errors, to_json)

    if do_exit:
        print(res)
        exit(errors)
    return errors, res


def wrapper(func):
    try:
        return eval(func)
    except AttributeError:
        print('WARNING\n missing {} function in cldetectlib.'.format(func))
        return False


@pretty_name('Check cagefs')
def fake_cagefs_checker():
    return ChkResult(SKIPPED, 'Cagefs version is too old. '
                              'Please run cagefsctl --sanity-check directly '
                              'or upgrade it to have full cldiag integration')


@pretty_name("Check control panel and it's configuration (for DirectAdmin only)")
def check_cp_diag():
    fix_motivation = ' Fixing the issue will provide CloudLinux support ' \
                     'on your control panel. \nSee details: {}'.format(cldiag_doc_link + '#diag_cp')
    detect.getCP()
    cp_name = detect.getCPName()
    if cp_name == 'Unknown':
        return ChkResult(SKIPPED, "Can't detect contol panel")

    res_msg = "Control Panel - {}; Version {};".format(
        cp_name, detect.CP_VERSION)
    if cp_name == 'DirectAdmin':
        if detect.da_check_options():
            return ChkResult(OK, res_msg + ' File "options.conf" is fine')
        else:
            return ChkResult(FAILED, res_msg + ' File "options.conf" '
                                               'has no line "cloudlinux=yes"' + fix_motivation)
    else:
        return ChkResult(OK, res_msg)


@pretty_name('Check fs.enforce_symlinksifowner is correctly enabled in sysctl conf')
def check_symlinksifowner():
    fix_motivation = ' Fixing that issue makes server more secure against ' \
                     'symlink attacks and enables protection of PHP configs ' \
                     'or other sensitive files. \nSee details: {}'.format(cldiag_doc_link + '#symlinksifowner')

    if detect.is_openvz():
        return ChkResult(SKIPPED, 'Not supported for OpenVZ environment')

    try:
        symlinks_if_owner = detect.get_symlinksifowner()
    except ExternalProgramFailed as e:
        detailed_out = 'To see full error run /sbin/sysctl --system'
        return ChkResult(FAILED, 'Some parameter in sysctl config has wrong configuration. '
                                 'Error: {} It`s recommended to fix it and try again '.format(
                                  get_short_error_message(str(e), detailed_out)))
    if symlinks_if_owner == 2:
        return ChkResult(FAILED, 'fs.enforce_symlinksifowner = 2' + fix_motivation)
    return ChkResult(OK, 'fs.enforce_symlinksifowner = {}'
                     .format(symlinks_if_owner))


def binary_check(params):
    module_name = params['name'].lower()
    link = cldiag_doc_link + '#check_' + module_name
    fix_motivation = ' Fix that issue to be sure that users run their sites inside CageFS and provide stable ' \
                     'work of sites that are using apache {} module. This may improve server security' \
                     '\nSee details: {}'.format(module_name, link)
    if not os.path.exists('/usr/sbin/cagefsctl'):
        return ChkResult(SKIPPED, "Cagefs is not installed")
    if not wrapper(params['status_function']):
        return ChkResult(SKIPPED, "{} is not enabled".format(params['name']))

    has_jail = detect.check_binary_has_jail(params['location'])
    if has_jail is None:
        return ChkResult(SKIPPED, "Unable to check {} module binary for "
                                  "custom control panel. This feature may be "
                                  "added in future updates.".format(params['name']))
    elif not has_jail:
        return ChkResult(FAILED, "Binary without CageFS jail " + fix_motivation)
    return ChkResult(OK, "binary has jail")


@pretty_name('Check suexec has cagefs jail')
def check_suexec():
    return binary_check(BINARY_CHECK_PARAMETERS['suexec'])


@pretty_name('Check suphp has cagefs jail')
def check_suphp():
    return binary_check(BINARY_CHECK_PARAMETERS['suphp'])


@pretty_name('Check UsePAM in /etc/ssh/sshd_config')
def check_use_pam():
    fix_motivation = 'Fix the issue to provide correct work of pam_lve module with sshd and ' \
                     'CageFS ssh sessions' \
                     '\nSee details: {}'.format(cldiag_doc_link + '#check_usepam')
    check_result = detect.check_SSHd_UsePAM()
    if check_result is None:
        return ChkResult(SKIPPED, 'Unable to open SSHd configuration file')
    if check_result:
        return ChkResult(OK, 'Config is fine')
    else:
        return ChkResult(FAILED, 'Line "UsePAM yes" is missing ' + fix_motivation)


@pretty_name('Check the validity of LVE limits on server')
def check_lve_limits():
    # type: () -> ChkResult
    """
    Validate lve limits
    """
    doc_link = 'https://docs.cloudlinux.com/lve-limits-validation.html'
    failed_message = 'Invalid LVE limits on server. See doc: ' + doc_link
    passed_message = 'Valid LVE limits on server.'

    limits_validator = LimitsValidator()

    result = limits_validator.validate_existing_limits()
    if result is None:
        return ChkResult(OK, passed_message)
    else:
        return ChkResult(FAILED, failed_message + '\n' + result)


@pretty_name('Check compatibility for PHP Selector')
def check_phpselector():
    """
    1. mod_ruid not present
    2. suphp
    3. mod_lsapi
    4. suexec and (fcgi or cgi)
    5. litespeed
    6. do not support other
    """

    ok_prefix = 'It looks ok [%s]'
    fail_prefix = (
        "Looks like your PHP handler doesn't support CloudLinux PHP Selector "
        "and as a result does not work "
        "http://docs.cloudlinux.com/index.html?compatiblity_matrix.html [%s]"
        "\nPlease, see: {} and try to fix issue to have working selector".format(cldiag_doc_link + '#check_phpselector')
    )

    # do not check for EA3
    if not os.path.exists("/etc/cpanel/ea4/is_ea4"):
        return ChkResult(SKIPPED, "It is not cPanel with EA4, can diag nothing")

    # litespeed check
    litespeed_config_file = '/usr/local/lsws/conf/httpd_config.xml'
    if os.path.exists(litespeed_config_file):
        return ChkResult(OK, ok_prefix % 'Litespeed')

    status = {'suexec': False,
              'suphp': False,
              'lsapi': False}

    handler = None
    # check /etc/cpanel/ea4/php.conf for EA4
    conf_path = "/etc/cpanel/ea4/php.conf"
    if os.path.exists(conf_path):
        try:
            fd = open(conf_path, "r")
            config = [x.strip() for x in fd.readlines()]
            fd.close()
        except IOError as e:
            err = "Can not read %s (%s)" % (conf_path, str(e))
            return ChkResult(FAILED, fail_prefix % err)
        # some stub version string
        for line in config:
            if line.startswith("default:"):
                default_ver = (line.split(':')[1]).strip()
                break
        else:
            err = "%s config should have default php version" % conf_path
            return ChkResult(FAILED, fail_prefix % err)

        for line in config:
            if line.startswith("%s:" % default_ver):
                handler = (line.split(':')[1]).strip()
        if handler not in ['cgi', 'fcgi', 'suphp', 'lsapi']:
            err = "doesn't support %s handler in ea4/php.conf" % handler
            return ChkResult(FAILED, fail_prefix % err)

    modules = detect.get_apache_modules()
    if modules is not None:
        if 'ruid2_module' in modules:
            return ChkResult(
                FAILED,
                fail_prefix % "It looks like you use mod_ruid. CloudLinux PHP Selector doesn't work properly with it"
            )
        status['suphp'] = 'suphp_module' in modules
        status['lsapi'] = 'lsapi_module' in modules
        status['suexec'] = 'suexec_module' in modules
    if not any([status['suphp'], status['suexec']]):
        return ChkResult(
            FAILED,
            fail_prefix % "It looks like you do not have mod_suphp or mod_suexec installed. "
                          "CloudLinux PHP Selector doesn't work properly without it"
        )
    if status['suphp'] or status['suexec'] and handler in ["suphp", "cgi", "fcgi", "lsapi"]:
        current = "php.conf:%s with %s" % (handler, ', '.join(s for s in status if status[s]))
        return ChkResult(OK, ok_prefix % current)
    err = "Some unknown php handler, perhaps we don't support it [found handler: %s and apache modules: %s]" % (
        '-' if handler is None else handler,
        ', '.join(module for module in status if status[module]))
    return ChkResult(FAILED, fail_prefix % err)


@pretty_name('Check fs.symlinkown_gid')
def check_symlinkowngid():
    fix_motivation = 'Fix the issue to provide symlink protection for apache user ' \
                     'and as a result make your Web Server more secure. ' \
                     '\nSee details: {}'.format(cldiag_doc_link + '#check_symlinkowngid')
    ok_res = ChkResult(
        OK, 'Web-server user is protected by Symlink Owner Match Protection')
    warn_msg_tpl = ("Web-server user '{}' is not in protected group "
                    "specified in {}. " + fix_motivation)
    symlinkown_gid_file = "/proc/sys/fs/symlinkown_gid"

    if detect.is_openvz():
        return ChkResult(SKIPPED, 'Not supported for OpenVZ environment')

    detect.get_apache_gid()  # This function fills few module-level variables
    apache_uname = detect.APACHE_UNAME

    try:
        pwd.getpwnam(apache_uname)
    except KeyError:
        return ChkResult(SKIPPED, 'There is no web-server user [{}] in system. '
                                  'Nothing to check'.format(apache_uname))

    try:
        current_symlinkown_gid = int(open(symlinkown_gid_file).read().strip())
    except Exception as e:
        return ChkResult(FAILED, "Can't read GID from {} with error: {}"
                         .format(symlinkown_gid_file, repr(e)))

    if detect.APACHE_GID == current_symlinkown_gid:
        return ok_res

    try:
        grp_members = grp.getgrgid(current_symlinkown_gid).gr_mem
    except KeyError:    # no such group
        grp_members = []
    if grp_members:
        # Most often both LiteSpeed and Apache runs under the same user
        if apache_uname in grp_members:
            return ok_res

    return ChkResult(FAILED, warn_msg_tpl.format(apache_uname,
                                                 symlinkown_gid_file))


@pretty_name('Check existence of all user\'s packages')
def check_existance_of_all_users_packages():
    """
    Return user's packages that do not exist in /var/cpanel/packages/

    """
    packages_dir_path = '/var/cpanel/packages/'
    users_dir_path = '/var/cpanel/users/'
    excluded_packages_names = ['undefined', 'default', 'cPanel Ticket System temporary user']

    if detect.getCPName() != 'cPanel':
        return ChkResult(SKIPPED, "should be run on cPanel only")

    if not os.listdir(users_dir_path):
        return ChkResult(SKIPPED, "no users on this server")

    # getting users packages
    process = subprocess.Popen(['/bin/grep', '-e', 'PLAN=', '-r'],
                               stdout=subprocess.PIPE,
                               stderr=subprocess.PIPE,
                               cwd=users_dir_path,
                               text=True)
    std_out, std_err = process.communicate()
    ret_code = process.returncode

    if ret_code != 0:
        msg = "error getting user's packages: {}".format(std_err)
        return ChkResult(FAILED, msg)
    else:
        try:
            # std_out sample: <username1>:PLAN=<package_name>\n <username2>:PLAN=<package_name>\n...
            all_users_packages = [(plan.split('=')[0].split(':')[0], plan.split('=')[1].strip())
                                  for plan in std_out.strip().split('\n')]
        except Exception as e:
            msg = "error processing user's packages: {}".format(e)
            return ChkResult(FAILED, msg)

    # getting exists packages
    exists_packages = [package for package in os.listdir(packages_dir_path)
                       if os.path.isfile(os.path.join(packages_dir_path, package))]

    # getting not exists users packages
    not_exists_users_packages = ['{}: {}'.format(user, package) for user, package in all_users_packages
                                 if package not in excluded_packages_names and package not in exists_packages]
    if not_exists_users_packages:
        msg = (
            "Found some nonexistent user's packages. "
            "List of \"user: package\" separated by semicolon: {}. "
            "If you want to apply package limits for those users - assign existing packages to them, "
            "otherwise limits will be applied incorrectly or not applied at all."
            .format('; '.join(not_exists_users_packages))
        )
        return ChkResult(FAILED, msg)
    else:
        return ChkResult(OK, "nonexistent user's packages aren't found")


DEFAULTS_CFG_PATH = '/etc/cl.selector/defaults.cfg'
PHP_CONF_PATH = '/etc/cl.selector/php.conf'
PARAM_NAME_LIST = ['Directive', 'Default', 'Type', 'Comment', 'Range', 'Remark']
TYPES = ['value', 'list', 'bool']


def parse_php_conf():
    """
    Parse php.conf and split it into blocks by empty line
    :return:
    """
    line_blocks = []
    block_index = 0
    # we can`t just ignore empty lines
    # new_block flag helps to define when empty line is delimiter of block
    # and when it is just unnecessary line
    # 1st non-empty and non-comment line is new block by default
    new_block = True
    with open(PHP_CONF_PATH, 'r') as conf:
        data = conf.readlines()
    for line in data:
        if line.startswith('#'):
            continue
        if len(line.strip()) > 0:
            # found 1st non-empty line
            # consider next lines as part of block, until empty line found
            new_block = False
            try:
                line_blocks[block_index]
            except:
                line_blocks.append([])
            line_blocks[block_index].append(line.strip())
        elif not new_block:
            # empty line found
            # ignore more than one empty line
            new_block = True
            block_index += 1
    return line_blocks


def check_block(block):
    result = True
    msg = ''
    for line in block:
        line_parts = line.split('=')
        if line_parts[0].strip() not in PARAM_NAME_LIST:
            result = False
            msg = msg + '\n' + 'Block %s has wrong param \n' % block_to_string(block)
        if line_parts[0].strip() == 'Type':
            if line_parts[1].strip() not in TYPES:
                result = False
                msg = msg + '\n' + 'Block %s has wrong directive \n' % block_to_string(block)
    return [result, msg]


def block_to_string(block):
    res_string = '\n'
    for line in block:
        res_string = res_string + str(line) + '\n'
    return res_string


@pretty_name('Checking /etc/cl.selector/php.conf')
def check_php_conf():
    php_ini_doc_link = 'https://docs.cloudlinux.com/custom_php_ini_options.html'
    fix_motivation = 'To fix the issue provide valid format for /etc/cl.selector/php.conf file. ' \
                     'It is used for PHP Selector and invalid format lead to directives misconfiguration ' \
                     'and as a result misconfiguration of selector' \
                     '\nPlease, read more about php.conf file in {}'.format(php_ini_doc_link)
    result = True
    msg = ''
    if not os.path.exists(PHP_CONF_PATH):
        return ChkResult(SKIPPED, 'File %s does not exist\n' % PHP_CONF_PATH)
    blocks = parse_php_conf()
    for block in blocks:
        r1, msg1 = check_block(block)
        result = result and r1
        if msg1:
            msg = msg + '\n' + msg1
    if not result:
        return ChkResult(FAILED, msg + fix_motivation)
    else:
        return ChkResult(OK, 'Ok')


@pretty_name('Checking /etc/cl.selector/defaults.cfg')
def check_defaults_cfg():
    fix_motivation = 'Details: this config file is used by php selector and stores it`s global options, ' \
                     'so it is important to keep needed configurations and valid syntax for PHP modules ' \
                     'settings to avoid selector`s misconfiguration' \
                     '\nSee details: {}'.format(cldiag_doc_link + '#check_defaults_cfg')
    if not os.path.exists(DEFAULTS_CFG_PATH):
        return ChkResult(SKIPPED, '%s does not exist\n' % DEFAULTS_CFG_PATH)
    try:
        defaults_cfg = ConfigParser.SafeConfigParser(interpolation=None,
                                                     strict=False)
        defaults_cfg.read(DEFAULTS_CFG_PATH)
    except Exception as e:
        return ChkResult(FAILED, str(e))
    try:
        default_php_version = defaults_cfg.get('versions', 'php')
    except (ConfigParser.NoOptionError, ConfigParser.NoSectionError):
        return ChkResult(FAILED, 'Default php version is undefined\n' + fix_motivation)
    for section in defaults_cfg.sections():
        if section.startswith('php'):
            php_version = section[3:]
            try:
                state = defaults_cfg.get(section, 'state')
            except ConfigParser.NoOptionError:
                state = 'enable'
            try:
                modules = defaults_cfg.get(section, 'modules')
            except ConfigParser.NoOptionError:
                modules = ''
            if default_php_version == php_version and state == 'disabled':
                return ChkResult(FAILED, 'Default php version {} is disabled\n{}'.format(php_version,
                                                                                                fix_motivation))
            if modules:
                if ',' in modules:
                    module_names = modules.split(',')
                    for name in module_names:
                        if not name:
                            sys.stderr.write('Warning: Modules list for version %s is strange\n' % php_version)
    return ChkResult(OK, 'OK')


def get_short_error_message(error, detailed_out, max_error_lines=10):
    """
    Handles error message making it shorter, if it is bigger than max limit
    :param error: error message to make shorter
    :param detailed_out: way for user to get full error manually
    :param max_error_lines: max lines for error
    :return: initial error (less than 10 lines) short error
    """
    error_lines = error.split('\n')
    if len(error_lines) > max_error_lines:
        return '\n'.join(error_lines[:max_error_lines // 2] + ['...'] + error_lines[-max_error_lines // 2:] +
                         [detailed_out])
    return error